package com.iot.amqp.examples;

import com.iot.amqp.AmqpClient;
import com.iot.amqp.AmqpClientOptions;
import com.iot.amqp.AmqpConstants;
import lombok.extern.slf4j.Slf4j;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 单队列主动拉取amqp消息
 */
@Slf4j
public class PullMessage extends AbstractAmqpExample {
    private AmqpClient amqpClient;

    /**
     * 单独启动一个线程主动拉取消息
     */
    private final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1));

    private final AtomicBoolean isClose = new AtomicBoolean();

    @Override
    protected void start() {
        isClose.set(false);
        try {
            amqpClient = createAmqpClient();
            MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
            executorService.execute(() -> {
                while (!isClose.get()) {
                    try {
                        Message message = consumer.receive();
                        // 此处进行消息处理。如果处理比较耗时，最好进行开启新的线程处理，否则可能造成心跳超时链接断开。
                        processMessage(message.getBody(String.class));
                        // 如果options.isAutoAcknowledge==false,此处应该调用message.acknowledge();
                    } catch (Exception e) {
                        log.warn("receive message error,", e);
                    }
                }
            });
        } catch (Exception e) {
            log.warn("AmqpClient initialize error,", e);
        }
    }

    @Override
    protected void stop() {
        isClose.set(true);
        executorService.shutdown();
        if (amqpClient != null) {
            amqpClient.close();
        }
    }

    /**
     * 创建单个amqpClient，实际使用中根据情况修改参数
     */
    private AmqpClient createAmqpClient() throws Exception {
        AmqpClientOptions options = AmqpClientOptions.builder()
                .host(AmqpConstants.HOST)
                .port(AmqpConstants.PORT)
                .accessKey(AmqpConstants.ACCESS_KEY)
                .accessCode(AmqpConstants.ACCESS_CODE)
                .queuePrefetch(100) // sdk会在内存中分配该参数大小的队列，用来接收消息，客户端内存较小的情况可以调小该参数。
                .build();
        AmqpClient amqpClient = new AmqpClient(options);
        amqpClient.initialize();
        return amqpClient;
    }
}
